// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package typeutil

import (
	"bytes"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"math"
	"reflect"
	"sort"
	"strconv"
	"unsafe"

	"github.com/cockroachdb/errors"
	"github.com/samber/lo"
	"go.uber.org/zap"
	"google.golang.org/protobuf/proto"

	"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
	"github.com/milvus-io/milvus/pkg/v2/common"
	"github.com/milvus-io/milvus/pkg/v2/log"
)

const DynamicFieldMaxLength = 512

type getVariableFieldLengthPolicy int

const (
	max    getVariableFieldLengthPolicy = 0
	avg    getVariableFieldLengthPolicy = 1
	custom getVariableFieldLengthPolicy = 2
)

func getVarFieldLength(fieldSchema *schemapb.FieldSchema, policy getVariableFieldLengthPolicy) (int, error) {
	maxLength := 0
	var err error

	paramsMap := make(map[string]string)
	for _, p := range fieldSchema.TypeParams {
		paramsMap[p.Key] = p.Value
	}

	switch fieldSchema.DataType {
	case schemapb.DataType_VarChar:
		maxLengthPerRowValue, ok := paramsMap[common.MaxLengthKey]
		if !ok {
			return 0, fmt.Errorf("the max_length was not specified, field type is %s", fieldSchema.DataType.String())
		}
		maxLength, err = strconv.Atoi(maxLengthPerRowValue)
		if err != nil {
			return 0, err
		}
		switch policy {
		case max:
			return maxLength, nil
		case avg:
			return maxLength / 2, nil
		case custom:
			// TODO this is a hack and may not accurate, we should rely on estimate size per record
			// However we should report size and datacoord calculate based on size
			// https://github.com/milvus-io/milvus/issues/17687
			if maxLength > 256 {
				return 256, nil
			}
			return maxLength, nil
		default:
			return 0, fmt.Errorf("unrecognized getVariableFieldLengthPolicy %v", policy)
		}
	// geometry field max length now consider the same as json field, which is 512 bytes
	case schemapb.DataType_Array, schemapb.DataType_JSON, schemapb.DataType_Geometry:
		return DynamicFieldMaxLength, nil
	default:
		return 0, fmt.Errorf("field %s is not a variable-length type", fieldSchema.DataType.String())
	}
}

// EstimateSizePerRecord returns the estimate size of a record in a collection
func EstimateSizePerRecord(schema *schemapb.CollectionSchema) (int, error) {
	return estimateSizeBy(schema, custom)
}

func EstimateMaxSizePerRecord(schema *schemapb.CollectionSchema) (int, error) {
	return estimateSizeBy(schema, max)
}

func EstimateAvgSizePerRecord(schema *schemapb.CollectionSchema) (int, error) {
	return estimateSizeBy(schema, avg)
}

func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLengthPolicy) (int, error) {
	res := 0
	for _, fs := range schema.Fields {
		switch fs.DataType {
		case schemapb.DataType_Bool, schemapb.DataType_Int8:
			res++
		case schemapb.DataType_Int16:
			res += 2
		case schemapb.DataType_Int32, schemapb.DataType_Float:
			res += 4
		case schemapb.DataType_Int64, schemapb.DataType_Double:
			res += 8
		// geometry wkb as well as wkt max len now is the same as max varfield lenth ,consider extend it if needed
		case schemapb.DataType_VarChar, schemapb.DataType_Array, schemapb.DataType_JSON, schemapb.DataType_Geometry:
			maxLengthPerRow, err := getVarFieldLength(fs, policy)
			if err != nil {
				return 0, err
			}
			res += maxLengthPerRow
		case schemapb.DataType_BinaryVector:
			for _, kv := range fs.TypeParams {
				if kv.Key == common.DimKey {
					v, err := strconv.Atoi(kv.Value)
					if err != nil {
						return -1, err
					}
					res += v / 8
					break
				}
			}
		case schemapb.DataType_FloatVector:
			for _, kv := range fs.TypeParams {
				if kv.Key == common.DimKey {
					v, err := strconv.Atoi(kv.Value)
					if err != nil {
						return -1, err
					}
					res += v * 4
					break
				}
			}
		case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
			for _, kv := range fs.TypeParams {
				if kv.Key == common.DimKey {
					v, err := strconv.Atoi(kv.Value)
					if err != nil {
						return -1, err
					}
					res += v * 2
					break
				}
			}
		case schemapb.DataType_SparseFloatVector:
			// TODO(SPARSE, zhengbuqian): size of sparse flaot vector
			// varies depending on the number of non-zeros. Using sparse vector
			// generated by SPLADE as reference and returning size of a sparse
			// vector with 150 non-zeros.
			res += 1200
		}
	}
	return res, nil
}

func CalcColumnSize(column *schemapb.FieldData) int {
	res := 0
	switch column.GetType() {
	case schemapb.DataType_Bool:
		res += len(column.GetScalars().GetBoolData().GetData())
	case schemapb.DataType_Int8:
		res += len(column.GetScalars().GetIntData().GetData())
	case schemapb.DataType_Int16:
		res += len(column.GetScalars().GetIntData().GetData()) * 2
	case schemapb.DataType_Int32:
		res += len(column.GetScalars().GetIntData().GetData()) * 4
	case schemapb.DataType_Int64:
		res += len(column.GetScalars().GetLongData().GetData()) * 8
	case schemapb.DataType_Float:
		res += len(column.GetScalars().GetFloatData().GetData()) * 4
	case schemapb.DataType_Double:
		res += len(column.GetScalars().GetDoubleData().GetData()) * 8
	case schemapb.DataType_VarChar:
		for _, str := range column.GetScalars().GetStringData().GetData() {
			res += len(str)
		}
	case schemapb.DataType_Array:
		for _, array := range column.GetScalars().GetArrayData().GetData() {
			res += CalcColumnSize(&schemapb.FieldData{
				Field: &schemapb.FieldData_Scalars{Scalars: array},
				Type:  column.GetScalars().GetArrayData().GetElementType(),
			})
		}
	case schemapb.DataType_JSON:
		for _, str := range column.GetScalars().GetJsonData().GetData() {
			res += len(str)
		}
	case schemapb.DataType_Geometry:
		for _, str := range column.GetScalars().GetGeometryData().GetData() {
			res += len(str)
		}
	default:
		panic("Unknown data type:" + column.Type.String())
	}
	return res
}

func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, error) {
	res := 0
	for _, fs := range fieldsData {
		switch fs.GetType() {
		case schemapb.DataType_Bool, schemapb.DataType_Int8:
			res++
		case schemapb.DataType_Int16:
			res += 2
		case schemapb.DataType_Int32, schemapb.DataType_Float:
			res += 4
		case schemapb.DataType_Int64, schemapb.DataType_Double:
			res += 8
		case schemapb.DataType_VarChar:
			if rowOffset >= len(fs.GetScalars().GetStringData().GetData()) {
				return 0, errors.New("offset out range of field datas")
			}
			res += len(fs.GetScalars().GetStringData().Data[rowOffset])
		case schemapb.DataType_Array:
			if rowOffset >= len(fs.GetScalars().GetArrayData().GetData()) {
				return 0, errors.New("offset out range of field datas")
			}
			array := fs.GetScalars().GetArrayData().GetData()[rowOffset]
			res += CalcColumnSize(&schemapb.FieldData{
				Field: &schemapb.FieldData_Scalars{Scalars: array},
				Type:  fs.GetScalars().GetArrayData().GetElementType(),
			})
		case schemapb.DataType_JSON:
			if rowOffset >= len(fs.GetScalars().GetJsonData().GetData()) {
				return 0, errors.New("offset out range of field datas")
			}
			res += len(fs.GetScalars().GetJsonData().GetData()[rowOffset])
		case schemapb.DataType_Geometry:
			if rowOffset >= len(fs.GetScalars().GetGeometryData().GetData()) {
				return 0, fmt.Errorf("offset out range of field datas")
			}
			res += len(fs.GetScalars().GetGeometryData().GetData()[rowOffset])
		case schemapb.DataType_BinaryVector:
			res += int(fs.GetVectors().GetDim())
		case schemapb.DataType_FloatVector:
			res += int(fs.GetVectors().GetDim() * 4)
		case schemapb.DataType_Float16Vector:
			res += int(fs.GetVectors().GetDim() * 2)
		case schemapb.DataType_BFloat16Vector:
			res += int(fs.GetVectors().GetDim() * 2)
		case schemapb.DataType_SparseFloatVector:
			vec := fs.GetVectors().GetSparseFloatVector()
			// counting only the size of the vector data, ignoring other
			// bytes used in proto.
			res += len(vec.Contents[rowOffset])
		default:
			panic("Unknown data type:" + fs.GetType().String())
		}
	}
	return res, nil
}

// SchemaHelper provides methods to get the schema of fields
type SchemaHelper struct {
	schema              *schemapb.CollectionSchema
	nameOffset          map[string]int
	idOffset            map[int64]int
	primaryKeyOffset    int
	partitionKeyOffset  int
	clusteringKeyOffset int
	dynamicFieldOffset  int
	loadFields          Set[int64]
}

func CreateSchemaHelperWithLoadFields(schema *schemapb.CollectionSchema, loadFields []int64) (*SchemaHelper, error) {
	if schema == nil {
		return nil, errors.New("schema is nil")
	}
	schemaHelper := SchemaHelper{
		schema:              schema,
		nameOffset:          make(map[string]int),
		idOffset:            make(map[int64]int),
		primaryKeyOffset:    -1,
		partitionKeyOffset:  -1,
		clusteringKeyOffset: -1,
		dynamicFieldOffset:  -1,
		loadFields:          NewSet(loadFields...),
	}
	for offset, field := range schema.Fields {
		if _, ok := schemaHelper.nameOffset[field.Name]; ok {
			return nil, fmt.Errorf("duplicated fieldName: %s", field.Name)
		}
		if _, ok := schemaHelper.idOffset[field.FieldID]; ok {
			return nil, fmt.Errorf("duplicated fieldID: %d", field.FieldID)
		}
		schemaHelper.nameOffset[field.Name] = offset
		schemaHelper.idOffset[field.FieldID] = offset
		if field.IsPrimaryKey {
			if schemaHelper.primaryKeyOffset != -1 {
				return nil, errors.New("primary key is not unique")
			}
			schemaHelper.primaryKeyOffset = offset
		}

		if field.IsPartitionKey {
			if schemaHelper.partitionKeyOffset != -1 {
				return nil, errors.New("partition key is not unique")
			}
			schemaHelper.partitionKeyOffset = offset
		}

		if field.IsClusteringKey {
			if schemaHelper.clusteringKeyOffset != -1 {
				return nil, errors.New("clustering key is not unique")
			}
			schemaHelper.clusteringKeyOffset = offset
		}

		if field.IsDynamic {
			if schemaHelper.dynamicFieldOffset != -1 {
				return nil, errors.New("dynamic field is not unique")
			}
			schemaHelper.dynamicFieldOffset = offset
		}
	}
	return &schemaHelper, nil
}

// CreateSchemaHelper returns a new SchemaHelper object
func CreateSchemaHelper(schema *schemapb.CollectionSchema) (*SchemaHelper, error) {
	return CreateSchemaHelperWithLoadFields(schema, nil)
}

// GetPrimaryKeyField returns the schema of the primary key
func (helper *SchemaHelper) GetPrimaryKeyField() (*schemapb.FieldSchema, error) {
	if helper.primaryKeyOffset == -1 {
		return nil, errors.New("failed to get primary key field: no primary in schema")
	}
	return helper.schema.Fields[helper.primaryKeyOffset], nil
}

// GetPartitionKeyField returns the schema of the partition key
func (helper *SchemaHelper) GetPartitionKeyField() (*schemapb.FieldSchema, error) {
	if helper.partitionKeyOffset == -1 {
		return nil, errors.New("failed to get partition key field: no partition key in schema")
	}
	return helper.schema.Fields[helper.partitionKeyOffset], nil
}

// GetClusteringKeyField returns the schema of the clustering key.
// If not found, an error shall be returned.
func (helper *SchemaHelper) GetClusteringKeyField() (*schemapb.FieldSchema, error) {
	if helper.clusteringKeyOffset == -1 {
		return nil, errors.New("failed to get clustering key field: not clustering key in schema")
	}
	return helper.schema.Fields[helper.clusteringKeyOffset], nil
}

// GetDynamicField returns the field schema of dynamic field if exists.
// if there is no dynamic field defined in schema, error will be returned.
func (helper *SchemaHelper) GetDynamicField() (*schemapb.FieldSchema, error) {
	if helper.dynamicFieldOffset == -1 {
		return nil, errors.New("failed to get dynamic field: no dynamic field in schema")
	}
	return helper.schema.Fields[helper.dynamicFieldOffset], nil
}

// GetFieldFromName is used to find the schema by field name
func (helper *SchemaHelper) GetFieldFromName(fieldName string) (*schemapb.FieldSchema, error) {
	offset, ok := helper.nameOffset[fieldName]
	if !ok {
		return nil, fmt.Errorf("failed to get field schema by name: fieldName(%s) not found", fieldName)
	}
	return helper.schema.Fields[offset], nil
}

// GetFieldFromNameDefaultJSON is used to find the schema by field name, if not exist, use json field
func (helper *SchemaHelper) GetFieldFromNameDefaultJSON(fieldName string) (*schemapb.FieldSchema, error) {
	offset, ok := helper.nameOffset[fieldName]
	if !ok {
		return helper.getDefaultJSONField(fieldName)
	}
	fieldSchema := helper.schema.Fields[offset]
	if !helper.IsFieldLoaded(fieldSchema.GetFieldID()) {
		return nil, errors.Newf("field %s is not loaded", fieldSchema)
	}
	return fieldSchema, nil
}

// GetFieldFromNameDefaultJSON returns whether is field loaded.
// If load fields is not provided, treated as loaded
func (helper *SchemaHelper) IsFieldLoaded(fieldID int64) bool {
	if len(helper.loadFields) == 0 {
		return true
	}
	return helper.loadFields.Contain(fieldID)
}

func (helper *SchemaHelper) getDefaultJSONField(fieldName string) (*schemapb.FieldSchema, error) {
	for _, f := range helper.schema.GetFields() {
		if f.DataType == schemapb.DataType_JSON && f.IsDynamic {
			if !helper.IsFieldLoaded(f.GetFieldID()) {
				return nil, errors.Newf("field %s is dynamic but dynamic field is not loaded", fieldName)
			}
			return f, nil
		}
	}
	errMsg := fmt.Sprintf("field %s not exist", fieldName)
	log.Warn(errMsg)
	return nil, errors.New(errMsg)
}

// GetFieldFromID returns the schema of specified field
func (helper *SchemaHelper) GetFieldFromID(fieldID int64) (*schemapb.FieldSchema, error) {
	offset, ok := helper.idOffset[fieldID]
	if !ok {
		return nil, fmt.Errorf("fieldID(%d) not found", fieldID)
	}
	return helper.schema.Fields[offset], nil
}

// GetVectorDimFromID returns the dimension of specified field
func (helper *SchemaHelper) GetVectorDimFromID(fieldID int64) (int, error) {
	sch, err := helper.GetFieldFromID(fieldID)
	if err != nil {
		return 0, err
	}
	if !IsVectorType(sch.DataType) {
		return 0, fmt.Errorf("field type = %s not has dim", schemapb.DataType_name[int32(sch.DataType)])
	}
	for _, kv := range sch.TypeParams {
		if kv.Key == common.DimKey {
			dim, err := strconv.Atoi(kv.Value)
			if err != nil {
				return 0, err
			}
			return dim, nil
		}
	}
	return 0, fmt.Errorf("fieldID(%d) not has dim", fieldID)
}

func (helper *SchemaHelper) GetFunctionByOutputField(field *schemapb.FieldSchema) (*schemapb.FunctionSchema, error) {
	for _, function := range helper.schema.GetFunctions() {
		for _, id := range function.GetOutputFieldIds() {
			if field.GetFieldID() == id {
				return function, nil
			}
		}
	}
	return nil, errors.New("function not exist")
}

// As of now, only BM25 function output field is not supported to retrieve raw field data
func (helper *SchemaHelper) CanRetrieveRawFieldData(field *schemapb.FieldSchema) bool {
	if !field.IsFunctionOutput {
		return true
	}

	f, err := helper.GetFunctionByOutputField(field)
	if err != nil {
		return false
	}

	return f.GetType() != schemapb.FunctionType_BM25
}

func (helper *SchemaHelper) GetCollectionName() string {
	return helper.schema.Name
}

func IsBinaryVectorType(dataType schemapb.DataType) bool {
	return dataType == schemapb.DataType_BinaryVector
}

func IsDenseFloatVectorType(dataType schemapb.DataType) bool {
	switch dataType {
	case schemapb.DataType_FloatVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
		return true
	default:
		return false
	}
}

// return VectorTypeSize for each dim (byte)
func VectorTypeSize(dataType schemapb.DataType) float64 {
	switch dataType {
	case schemapb.DataType_FloatVector, schemapb.DataType_SparseFloatVector:
		return 4.0
	case schemapb.DataType_BinaryVector:
		return 0.125
	case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
		return 2.0
	default:
		return 0.0
	}
}

func IsSparseFloatVectorType(dataType schemapb.DataType) bool {
	return dataType == schemapb.DataType_SparseFloatVector
}

func IsFloatVectorType(dataType schemapb.DataType) bool {
	return IsDenseFloatVectorType(dataType) || IsSparseFloatVectorType(dataType)
}

func IsFixDimVectorType(dataType schemapb.DataType) bool {
	return IsBinaryVectorType(dataType) || IsDenseFloatVectorType(dataType)
}

// IsVectorType returns true if input is a vector type, otherwise false
func IsVectorType(dataType schemapb.DataType) bool {
	return IsBinaryVectorType(dataType) || IsFloatVectorType(dataType)
}

// IsIntegerType returns true if input is an integer type, otherwise false
func IsIntegerType(dataType schemapb.DataType) bool {
	switch dataType {
	case schemapb.DataType_Int8, schemapb.DataType_Int16,
		schemapb.DataType_Int32, schemapb.DataType_Int64:
		return true
	default:
		return false
	}
}

func IsJSONType(dataType schemapb.DataType) bool {
	return dataType == schemapb.DataType_JSON
}

func IsGeometryType(dataType schemapb.DataType) bool {
	return dataType == schemapb.DataType_Geometry
}

func IsArrayType(dataType schemapb.DataType) bool {
	return dataType == schemapb.DataType_Array
}

// IsFloatingType returns true if input is a floating type, otherwise false
func IsFloatingType(dataType schemapb.DataType) bool {
	switch dataType {
	case schemapb.DataType_Float, schemapb.DataType_Double:
		return true
	default:
		return false
	}
}

// IsArithmetic returns true if input is of arithmetic type, otherwise false.
func IsArithmetic(dataType schemapb.DataType) bool {
	return IsIntegerType(dataType) || IsFloatingType(dataType)
}

// IsBoolType returns true if input is a bool type, otherwise false
func IsBoolType(dataType schemapb.DataType) bool {
	switch dataType {
	case schemapb.DataType_Bool:
		return true
	default:
		return false
	}
}

// IsStringType returns true if input is a varChar type, otherwise false
func IsStringType(dataType schemapb.DataType) bool {
	switch dataType {
	case schemapb.DataType_String, schemapb.DataType_VarChar:
		return true
	default:
		return false
	}
}

func IsArrayContainStringElementType(dataType schemapb.DataType, elementType schemapb.DataType) bool {
	if IsArrayType(dataType) {
		if elementType == schemapb.DataType_String || elementType == schemapb.DataType_VarChar {
			return true
		}
	}
	return false
}

func IsVariableDataType(dataType schemapb.DataType) bool {
	return IsStringType(dataType) || IsArrayType(dataType) || IsJSONType(dataType) || IsGeometryType(dataType)
}

func IsPrimitiveType(dataType schemapb.DataType) bool {
	return IsArithmetic(dataType) || IsStringType(dataType) || IsBoolType(dataType)
}

// PrepareResultFieldData construct this slice fo FieldData for final result reduce
// this shall preallocate the space for field data internal slice prevent slice growing cost.
func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemapb.FieldData {
	result := make([]*schemapb.FieldData, 0, len(sample))
	for _, fieldData := range sample {
		fd := &schemapb.FieldData{
			Type:      fieldData.Type,
			FieldName: fieldData.FieldName,
			FieldId:   fieldData.FieldId,
			IsDynamic: fieldData.IsDynamic,
		}
		switch fieldType := fieldData.Field.(type) {
		case *schemapb.FieldData_Scalars:
			scalarField := fieldData.GetScalars()
			scalar := &schemapb.FieldData_Scalars{
				Scalars: &schemapb.ScalarField{},
			}
			switch fieldType.Scalars.Data.(type) {
			case *schemapb.ScalarField_BoolData:
				scalar.Scalars.Data = &schemapb.ScalarField_BoolData{
					BoolData: &schemapb.BoolArray{
						Data: make([]bool, 0, topK),
					},
				}
			case *schemapb.ScalarField_IntData:
				scalar.Scalars.Data = &schemapb.ScalarField_IntData{
					IntData: &schemapb.IntArray{
						Data: make([]int32, 0, topK),
					},
				}
			case *schemapb.ScalarField_LongData:
				scalar.Scalars.Data = &schemapb.ScalarField_LongData{
					LongData: &schemapb.LongArray{
						Data: make([]int64, 0, topK),
					},
				}
			case *schemapb.ScalarField_FloatData:
				scalar.Scalars.Data = &schemapb.ScalarField_FloatData{
					FloatData: &schemapb.FloatArray{
						Data: make([]float32, 0, topK),
					},
				}
			case *schemapb.ScalarField_DoubleData:
				scalar.Scalars.Data = &schemapb.ScalarField_DoubleData{
					DoubleData: &schemapb.DoubleArray{
						Data: make([]float64, 0, topK),
					},
				}
			case *schemapb.ScalarField_StringData:
				scalar.Scalars.Data = &schemapb.ScalarField_StringData{
					StringData: &schemapb.StringArray{
						Data: make([]string, 0, topK),
					},
				}
			case *schemapb.ScalarField_JsonData:
				scalar.Scalars.Data = &schemapb.ScalarField_JsonData{
					JsonData: &schemapb.JSONArray{
						Data: make([][]byte, 0, topK),
					},
				}
			case *schemapb.ScalarField_GeometryData:
				scalar.Scalars.Data = &schemapb.ScalarField_GeometryData{
					GeometryData: &schemapb.GeometryArray{
						Data: make([][]byte, 0, topK),
					},
				}
			case *schemapb.ScalarField_ArrayData:
				scalar.Scalars.Data = &schemapb.ScalarField_ArrayData{
					ArrayData: &schemapb.ArrayArray{
						Data:        make([]*schemapb.ScalarField, 0, topK),
						ElementType: scalarField.GetArrayData().GetElementType(),
					},
				}
			}
			fd.Field = scalar
		case *schemapb.FieldData_Vectors:
			vectorField := fieldData.GetVectors()
			dim := vectorField.GetDim()
			vectors := &schemapb.FieldData_Vectors{
				Vectors: &schemapb.VectorField{
					Dim: dim,
				},
			}
			switch fieldType.Vectors.Data.(type) {
			case *schemapb.VectorField_FloatVector:
				vectors.Vectors.Data = &schemapb.VectorField_FloatVector{
					FloatVector: &schemapb.FloatArray{
						Data: make([]float32, 0, dim*topK),
					},
				}
			case *schemapb.VectorField_Float16Vector:
				vectors.Vectors.Data = &schemapb.VectorField_Float16Vector{
					Float16Vector: make([]byte, 0, topK*dim*2),
				}
			case *schemapb.VectorField_Bfloat16Vector:
				vectors.Vectors.Data = &schemapb.VectorField_Bfloat16Vector{
					Bfloat16Vector: make([]byte, 0, topK*dim*2),
				}
			case *schemapb.VectorField_BinaryVector:
				vectors.Vectors.Data = &schemapb.VectorField_BinaryVector{
					BinaryVector: make([]byte, 0, topK*dim/8),
				}
			case *schemapb.VectorField_SparseFloatVector:
				vectors.Vectors.Data = &schemapb.VectorField_SparseFloatVector{
					SparseFloatVector: &schemapb.SparseFloatArray{
						// dim to be updated when appending data.
						Dim:      0,
						Contents: make([][]byte, 0, topK),
					},
				}
				vectors.Vectors.Dim = 0
			}
			fd.Field = vectors
		}
		result = append(result, fd)
	}
	return result
}

// AppendFieldData appends fields data of specified index from src to dst
func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int64) {
	for i, fieldData := range src {
		switch fieldType := fieldData.Field.(type) {
		case *schemapb.FieldData_Scalars:
			if dst[i] == nil || dst[i].GetScalars() == nil {
				dst[i] = &schemapb.FieldData{
					Type:      fieldData.Type,
					FieldName: fieldData.FieldName,
					FieldId:   fieldData.FieldId,
					IsDynamic: fieldData.IsDynamic,
					Field: &schemapb.FieldData_Scalars{
						Scalars: &schemapb.ScalarField{},
					},
				}
			}
			if len(fieldData.GetValidData()) != 0 {
				dst[i].ValidData = append(dst[i].ValidData, fieldData.ValidData[idx])
			}
			dstScalar := dst[i].GetScalars()
			switch srcScalar := fieldType.Scalars.Data.(type) {
			case *schemapb.ScalarField_BoolData:
				if dstScalar.GetBoolData() == nil {
					dstScalar.Data = &schemapb.ScalarField_BoolData{
						BoolData: &schemapb.BoolArray{
							Data: []bool{srcScalar.BoolData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetBoolData().Data = append(dstScalar.GetBoolData().Data, srcScalar.BoolData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.BoolData.Data[idx]))
			case *schemapb.ScalarField_IntData:
				if dstScalar.GetIntData() == nil {
					dstScalar.Data = &schemapb.ScalarField_IntData{
						IntData: &schemapb.IntArray{
							Data: []int32{srcScalar.IntData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetIntData().Data = append(dstScalar.GetIntData().Data, srcScalar.IntData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.IntData.Data[idx]))
			case *schemapb.ScalarField_LongData:
				if dstScalar.GetLongData() == nil {
					dstScalar.Data = &schemapb.ScalarField_LongData{
						LongData: &schemapb.LongArray{
							Data: []int64{srcScalar.LongData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetLongData().Data = append(dstScalar.GetLongData().Data, srcScalar.LongData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.LongData.Data[idx]))
			case *schemapb.ScalarField_FloatData:
				if dstScalar.GetFloatData() == nil {
					dstScalar.Data = &schemapb.ScalarField_FloatData{
						FloatData: &schemapb.FloatArray{
							Data: []float32{srcScalar.FloatData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetFloatData().Data = append(dstScalar.GetFloatData().Data, srcScalar.FloatData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.FloatData.Data[idx]))
			case *schemapb.ScalarField_DoubleData:
				if dstScalar.GetDoubleData() == nil {
					dstScalar.Data = &schemapb.ScalarField_DoubleData{
						DoubleData: &schemapb.DoubleArray{
							Data: []float64{srcScalar.DoubleData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetDoubleData().Data = append(dstScalar.GetDoubleData().Data, srcScalar.DoubleData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.DoubleData.Data[idx]))
			case *schemapb.ScalarField_StringData:
				if dstScalar.GetStringData() == nil {
					dstScalar.Data = &schemapb.ScalarField_StringData{
						StringData: &schemapb.StringArray{
							Data: []string{srcScalar.StringData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetStringData().Data = append(dstScalar.GetStringData().Data, srcScalar.StringData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.StringData.Data[idx]))
			case *schemapb.ScalarField_ArrayData:
				if dstScalar.GetArrayData() == nil {
					dstScalar.Data = &schemapb.ScalarField_ArrayData{
						ArrayData: &schemapb.ArrayArray{
							Data:        []*schemapb.ScalarField{srcScalar.ArrayData.Data[idx]},
							ElementType: srcScalar.ArrayData.ElementType,
						},
					}
				} else {
					dstScalar.GetArrayData().Data = append(dstScalar.GetArrayData().Data, srcScalar.ArrayData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.ArrayData.Data[idx]))
			case *schemapb.ScalarField_JsonData:
				if dstScalar.GetJsonData() == nil {
					dstScalar.Data = &schemapb.ScalarField_JsonData{
						JsonData: &schemapb.JSONArray{
							Data: [][]byte{srcScalar.JsonData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetJsonData().Data = append(dstScalar.GetJsonData().Data, srcScalar.JsonData.Data[idx])
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcScalar.JsonData.Data[idx]))
			case *schemapb.ScalarField_GeometryData:
				if dstScalar.GetGeometryData() == nil {
					dstScalar.Data = &schemapb.ScalarField_GeometryData{
						GeometryData: &schemapb.GeometryArray{
							Data: [][]byte{srcScalar.GeometryData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetGeometryData().Data = append(dstScalar.GetGeometryData().Data, srcScalar.GeometryData.Data[idx])
				}
				appendSize += int64(unsafe.Sizeof(srcScalar.GeometryData.Data[idx]))
			// just for result
			case *schemapb.ScalarField_GeometryWktData:
				if dstScalar.GetGeometryWktData() == nil {
					dstScalar.Data = &schemapb.ScalarField_GeometryWktData{
						GeometryWktData: &schemapb.GeometryWktArray{
							Data: []string{srcScalar.GeometryWktData.Data[idx]},
						},
					}
				} else {
					dstScalar.GetGeometryWktData().Data = append(dstScalar.GetGeometryWktData().Data, srcScalar.GeometryWktData.Data[idx])
				}
			default:
				log.Error("Not supported field type", zap.String("field type", fieldData.Type.String()))
			}
		case *schemapb.FieldData_Vectors:
			dim := fieldType.Vectors.Dim
			if dst[i] == nil || dst[i].GetVectors() == nil {
				dst[i] = &schemapb.FieldData{
					Type:      fieldData.Type,
					FieldName: fieldData.FieldName,
					FieldId:   fieldData.FieldId,
					Field: &schemapb.FieldData_Vectors{
						Vectors: &schemapb.VectorField{
							Dim: dim,
						},
					},
				}
			}
			dstVector := dst[i].GetVectors()
			switch srcVector := fieldType.Vectors.Data.(type) {
			case *schemapb.VectorField_BinaryVector:
				if dstVector.GetBinaryVector() == nil {
					srcToCopy := srcVector.BinaryVector[idx*(dim/8) : (idx+1)*(dim/8)]
					dstVector.Data = &schemapb.VectorField_BinaryVector{
						BinaryVector: make([]byte, len(srcToCopy)),
					}
					copy(dstVector.Data.(*schemapb.VectorField_BinaryVector).BinaryVector, srcToCopy)
				} else {
					dstBinaryVector := dstVector.Data.(*schemapb.VectorField_BinaryVector)
					dstBinaryVector.BinaryVector = append(dstBinaryVector.BinaryVector, srcVector.BinaryVector[idx*(dim/8):(idx+1)*(dim/8)]...)
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcVector.BinaryVector[idx*(dim/8) : (idx+1)*(dim/8)]))
			case *schemapb.VectorField_FloatVector:
				if dstVector.GetFloatVector() == nil {
					srcToCopy := srcVector.FloatVector.Data[idx*dim : (idx+1)*dim]
					dstVector.Data = &schemapb.VectorField_FloatVector{
						FloatVector: &schemapb.FloatArray{
							Data: make([]float32, len(srcToCopy)),
						},
					}
					copy(dstVector.Data.(*schemapb.VectorField_FloatVector).FloatVector.Data, srcToCopy)
				} else {
					dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data[idx*dim:(idx+1)*dim]...)
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcVector.FloatVector.Data[idx*dim : (idx+1)*dim]))
			case *schemapb.VectorField_Float16Vector:
				if dstVector.GetFloat16Vector() == nil {
					srcToCopy := srcVector.Float16Vector[idx*(dim*2) : (idx+1)*(dim*2)]
					dstVector.Data = &schemapb.VectorField_Float16Vector{
						Float16Vector: make([]byte, len(srcToCopy)),
					}
					copy(dstVector.Data.(*schemapb.VectorField_Float16Vector).Float16Vector, srcToCopy)
				} else {
					dstFloat16Vector := dstVector.Data.(*schemapb.VectorField_Float16Vector)
					dstFloat16Vector.Float16Vector = append(dstFloat16Vector.Float16Vector, srcVector.Float16Vector[idx*(dim*2):(idx+1)*(dim*2)]...)
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcVector.Float16Vector[idx*(dim*2) : (idx+1)*(dim*2)]))
			case *schemapb.VectorField_Bfloat16Vector:
				if dstVector.GetBfloat16Vector() == nil {
					srcToCopy := srcVector.Bfloat16Vector[idx*(dim*2) : (idx+1)*(dim*2)]
					dstVector.Data = &schemapb.VectorField_Bfloat16Vector{
						Bfloat16Vector: make([]byte, len(srcToCopy)),
					}
					copy(dstVector.Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector, srcToCopy)
				} else {
					dstBfloat16Vector := dstVector.Data.(*schemapb.VectorField_Bfloat16Vector)
					dstBfloat16Vector.Bfloat16Vector = append(dstBfloat16Vector.Bfloat16Vector, srcVector.Bfloat16Vector[idx*(dim*2):(idx+1)*(dim*2)]...)
				}
				/* #nosec G103 */
				appendSize += int64(unsafe.Sizeof(srcVector.Bfloat16Vector[idx*(dim*2) : (idx+1)*(dim*2)]))
			case *schemapb.VectorField_SparseFloatVector:
				if dstVector.GetSparseFloatVector() == nil {
					dstVector.Data = &schemapb.VectorField_SparseFloatVector{
						SparseFloatVector: &schemapb.SparseFloatArray{
							Dim:      0,
							Contents: make([][]byte, 0),
						},
					}
					dstVector.Dim = srcVector.SparseFloatVector.Dim
				}
				vec := dstVector.Data.(*schemapb.VectorField_SparseFloatVector).SparseFloatVector
				appendSize += appendSparseFloatArraySingleRow(vec, srcVector.SparseFloatVector, idx)
			default:
				log.Error("Not supported field type", zap.String("field type", fieldData.Type.String()))
			}
		}
	}

	return
}

// DeleteFieldData delete fields data appended last time
func DeleteFieldData(dst []*schemapb.FieldData) {
	for i, fieldData := range dst {
		switch fieldType := fieldData.Field.(type) {
		case *schemapb.FieldData_Scalars:
			if dst[i] == nil || dst[i].GetScalars() == nil {
				log.Info("empty field data can't be deleted")
				return
			}
			dstScalar := dst[i].GetScalars()
			switch fieldType.Scalars.Data.(type) {
			case *schemapb.ScalarField_BoolData:
				dstScalar.GetBoolData().Data = dstScalar.GetBoolData().Data[:len(dstScalar.GetBoolData().Data)-1]
			case *schemapb.ScalarField_IntData:
				dstScalar.GetIntData().Data = dstScalar.GetIntData().Data[:len(dstScalar.GetIntData().Data)-1]
			case *schemapb.ScalarField_LongData:
				dstScalar.GetLongData().Data = dstScalar.GetLongData().Data[:len(dstScalar.GetLongData().Data)-1]
			case *schemapb.ScalarField_FloatData:
				dstScalar.GetFloatData().Data = dstScalar.GetFloatData().Data[:len(dstScalar.GetFloatData().Data)-1]
			case *schemapb.ScalarField_DoubleData:
				dstScalar.GetDoubleData().Data = dstScalar.GetDoubleData().Data[:len(dstScalar.GetDoubleData().Data)-1]
			case *schemapb.ScalarField_StringData:
				dstScalar.GetStringData().Data = dstScalar.GetStringData().Data[:len(dstScalar.GetStringData().Data)-1]
			case *schemapb.ScalarField_JsonData:
				dstScalar.GetJsonData().Data = dstScalar.GetJsonData().Data[:len(dstScalar.GetJsonData().Data)-1]
			case *schemapb.ScalarField_GeometryData:
				dstScalar.GetGeometryData().Data = dstScalar.GetGeometryData().Data[:len(dstScalar.GetGeometryData().Data)-1]
			default:
				log.Error("wrong field type added", zap.String("field type", fieldData.Type.String()))
			}
		case *schemapb.FieldData_Vectors:
			if dst[i] == nil || dst[i].GetVectors() == nil {
				log.Info("empty field data can't be deleted")
				return
			}
			dim := fieldType.Vectors.Dim
			dstVector := dst[i].GetVectors()
			switch fieldType.Vectors.Data.(type) {
			case *schemapb.VectorField_BinaryVector:
				dstBinaryVector := dstVector.Data.(*schemapb.VectorField_BinaryVector)
				dstBinaryVector.BinaryVector = dstBinaryVector.BinaryVector[:len(dstBinaryVector.BinaryVector)-int(dim/8)]
			case *schemapb.VectorField_FloatVector:
				dstVector.GetFloatVector().Data = dstVector.GetFloatVector().Data[:len(dstVector.GetFloatVector().Data)-int(dim)]
			case *schemapb.VectorField_Float16Vector:
				dstFloat16Vector := dstVector.Data.(*schemapb.VectorField_Float16Vector)
				dstFloat16Vector.Float16Vector = dstFloat16Vector.Float16Vector[:len(dstFloat16Vector.Float16Vector)-int(dim*2)]
			case *schemapb.VectorField_Bfloat16Vector:
				dstBfloat16Vector := dstVector.Data.(*schemapb.VectorField_Bfloat16Vector)
				dstBfloat16Vector.Bfloat16Vector = dstBfloat16Vector.Bfloat16Vector[:len(dstBfloat16Vector.Bfloat16Vector)-int(dim*2)]
			case *schemapb.VectorField_SparseFloatVector:
				trimSparseFloatArray(dstVector.GetSparseFloatVector())
			default:
				log.Error("wrong field type added", zap.String("field type", fieldData.Type.String()))
			}
		}
	}
}

// MergeFieldData appends fields data to dst
func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error {
	fieldID2Data := make(map[int64]*schemapb.FieldData)
	for _, data := range dst {
		fieldID2Data[data.FieldId] = data
	}
	for _, srcFieldData := range src {
		switch fieldType := srcFieldData.Field.(type) {
		case *schemapb.FieldData_Scalars:
			if _, ok := fieldID2Data[srcFieldData.FieldId]; !ok {
				scalarFieldData := &schemapb.FieldData{
					Type:      srcFieldData.Type,
					FieldName: srcFieldData.FieldName,
					FieldId:   srcFieldData.FieldId,
					Field: &schemapb.FieldData_Scalars{
						Scalars: &schemapb.ScalarField{},
					},
				}
				dst = append(dst, scalarFieldData)
				fieldID2Data[srcFieldData.FieldId] = scalarFieldData
			}
			fieldData := fieldID2Data[srcFieldData.FieldId]
			fieldData.ValidData = append(fieldData.ValidData, srcFieldData.GetValidData()...)
			dstScalar := fieldData.GetScalars()
			switch srcScalar := fieldType.Scalars.Data.(type) {
			case *schemapb.ScalarField_BoolData:
				if dstScalar.GetBoolData() == nil {
					dstScalar.Data = &schemapb.ScalarField_BoolData{
						BoolData: &schemapb.BoolArray{
							Data: srcScalar.BoolData.Data,
						},
					}
				} else {
					dstScalar.GetBoolData().Data = append(dstScalar.GetBoolData().Data, srcScalar.BoolData.Data...)
				}
			case *schemapb.ScalarField_IntData:
				if dstScalar.GetIntData() == nil {
					dstScalar.Data = &schemapb.ScalarField_IntData{
						IntData: &schemapb.IntArray{
							Data: srcScalar.IntData.Data,
						},
					}
				} else {
					dstScalar.GetIntData().Data = append(dstScalar.GetIntData().Data, srcScalar.IntData.Data...)
				}
			case *schemapb.ScalarField_LongData:
				if dstScalar.GetLongData() == nil {
					dstScalar.Data = &schemapb.ScalarField_LongData{
						LongData: &schemapb.LongArray{
							Data: srcScalar.LongData.Data,
						},
					}
				} else {
					dstScalar.GetLongData().Data = append(dstScalar.GetLongData().Data, srcScalar.LongData.Data...)
				}
			case *schemapb.ScalarField_FloatData:
				if dstScalar.GetFloatData() == nil {
					dstScalar.Data = &schemapb.ScalarField_FloatData{
						FloatData: &schemapb.FloatArray{
							Data: srcScalar.FloatData.Data,
						},
					}
				} else {
					dstScalar.GetFloatData().Data = append(dstScalar.GetFloatData().Data, srcScalar.FloatData.Data...)
				}
			case *schemapb.ScalarField_DoubleData:
				if dstScalar.GetDoubleData() == nil {
					dstScalar.Data = &schemapb.ScalarField_DoubleData{
						DoubleData: &schemapb.DoubleArray{
							Data: srcScalar.DoubleData.Data,
						},
					}
				} else {
					dstScalar.GetDoubleData().Data = append(dstScalar.GetDoubleData().Data, srcScalar.DoubleData.Data...)
				}
			case *schemapb.ScalarField_StringData:
				if dstScalar.GetStringData() == nil {
					dstScalar.Data = &schemapb.ScalarField_StringData{
						StringData: &schemapb.StringArray{
							Data: srcScalar.StringData.Data,
						},
					}
				} else {
					dstScalar.GetStringData().Data = append(dstScalar.GetStringData().Data, srcScalar.StringData.Data...)
				}
			case *schemapb.ScalarField_ArrayData:
				if dstScalar.GetArrayData() == nil {
					dstScalar.Data = &schemapb.ScalarField_ArrayData{
						ArrayData: &schemapb.ArrayArray{
							Data:        srcScalar.ArrayData.Data,
							ElementType: srcScalar.ArrayData.ElementType,
						},
					}
				} else {
					dstScalar.GetArrayData().Data = append(dstScalar.GetArrayData().Data, srcScalar.ArrayData.Data...)
				}
			case *schemapb.ScalarField_JsonData:
				if dstScalar.GetJsonData() == nil {
					dstScalar.Data = &schemapb.ScalarField_JsonData{
						JsonData: &schemapb.JSONArray{
							Data: srcScalar.JsonData.Data,
						},
					}
				} else {
					dstScalar.GetJsonData().Data = append(dstScalar.GetJsonData().Data, srcScalar.JsonData.Data...)
				}
			case *schemapb.ScalarField_GeometryData:
				if dstScalar.GetGeometryData() == nil {
					dstScalar.Data = &schemapb.ScalarField_GeometryData{
						GeometryData: &schemapb.GeometryArray{
							Data: srcScalar.GeometryData.Data,
						},
					}
				} else {
					dstScalar.GetGeometryData().Data = append(dstScalar.GetGeometryData().Data, srcScalar.GeometryData.Data...)
				}
			case *schemapb.ScalarField_BytesData:
				if dstScalar.GetBytesData() == nil {
					dstScalar.Data = &schemapb.ScalarField_BytesData{
						BytesData: &schemapb.BytesArray{
							Data: srcScalar.BytesData.Data,
						},
					}
				} else {
					dstScalar.GetBytesData().Data = append(dstScalar.GetBytesData().Data, srcScalar.BytesData.Data...)
				}
			default:
				log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String()))
				return errors.New("unsupported data type: " + srcFieldData.Type.String())
			}
		case *schemapb.FieldData_Vectors:
			dim := fieldType.Vectors.Dim
			if _, ok := fieldID2Data[srcFieldData.FieldId]; !ok {
				vectorFieldData := &schemapb.FieldData{
					Type:      srcFieldData.Type,
					FieldName: srcFieldData.FieldName,
					FieldId:   srcFieldData.FieldId,
					Field: &schemapb.FieldData_Vectors{
						Vectors: &schemapb.VectorField{
							Dim: dim,
						},
					},
				}
				dst = append(dst, vectorFieldData)
				fieldID2Data[srcFieldData.FieldId] = vectorFieldData
			}
			dstVector := fieldID2Data[srcFieldData.FieldId].GetVectors()
			switch srcVector := fieldType.Vectors.Data.(type) {
			case *schemapb.VectorField_BinaryVector:
				if dstVector.GetBinaryVector() == nil {
					dstVector.Data = &schemapb.VectorField_BinaryVector{
						BinaryVector: srcVector.BinaryVector,
					}
				} else {
					dstBinaryVector := dstVector.Data.(*schemapb.VectorField_BinaryVector)
					dstBinaryVector.BinaryVector = append(dstBinaryVector.BinaryVector, srcVector.BinaryVector...)
				}
			case *schemapb.VectorField_Float16Vector:
				if dstVector.GetFloat16Vector() == nil {
					dstVector.Data = &schemapb.VectorField_Float16Vector{
						Float16Vector: srcVector.Float16Vector,
					}
				} else {
					dstFloat16Vector := dstVector.Data.(*schemapb.VectorField_Float16Vector)
					dstFloat16Vector.Float16Vector = append(dstFloat16Vector.Float16Vector, srcVector.Float16Vector...)
				}
			case *schemapb.VectorField_Bfloat16Vector:
				if dstVector.GetBfloat16Vector() == nil {
					dstVector.Data = &schemapb.VectorField_Bfloat16Vector{
						Bfloat16Vector: srcVector.Bfloat16Vector,
					}
				} else {
					dstBfloat16Vector := dstVector.Data.(*schemapb.VectorField_Bfloat16Vector)
					dstBfloat16Vector.Bfloat16Vector = append(dstBfloat16Vector.Bfloat16Vector, srcVector.Bfloat16Vector...)
				}
			case *schemapb.VectorField_FloatVector:
				if dstVector.GetFloatVector() == nil {
					dstVector.Data = &schemapb.VectorField_FloatVector{
						FloatVector: &schemapb.FloatArray{
							Data: srcVector.FloatVector.Data,
						},
					}
				} else {
					dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data...)
				}
			case *schemapb.VectorField_SparseFloatVector:
				if dstVector.GetSparseFloatVector() == nil {
					dstVector.Data = &schemapb.VectorField_SparseFloatVector{
						SparseFloatVector: srcVector.SparseFloatVector,
					}
				} else {
					appendSparseFloatArray(dstVector.GetSparseFloatVector(), srcVector.SparseFloatVector)
				}
			default:
				log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String()))
				return errors.New("unsupported data type: " + srcFieldData.Type.String())
			}
		}
	}

	return nil
}

// GetVectorFieldSchemas get vector fields schema from collection schema.
func GetVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSchema {
	ret := make([]*schemapb.FieldSchema, 0)
	for _, fieldSchema := range schema.GetFields() {
		if IsVectorType(fieldSchema.DataType) {
			ret = append(ret, fieldSchema)
		}
	}

	return ret
}

func GetDenseVectorFieldSchemas(schema *schemapb.CollectionSchema) []*schemapb.FieldSchema {
	ret := make([]*schemapb.FieldSchema, 0)
	for _, fieldSchema := range schema.GetFields() {
		if IsDenseFloatVectorType(fieldSchema.DataType) || IsBinaryVectorType(fieldSchema.DataType) {
			ret = append(ret, fieldSchema)
		}
	}
	return ret
}

// GetPrimaryFieldSchema get primary field schema from collection schema
func GetPrimaryFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) {
	for _, fieldSchema := range schema.GetFields() {
		if fieldSchema.IsPrimaryKey {
			return fieldSchema, nil
		}
	}

	return nil, errors.New("primary field is not found")
}

func IsFieldSparseFloatVector(schema *schemapb.CollectionSchema, fieldID int64) bool {
	fieldSchema := GetField(schema, fieldID)
	return fieldSchema != nil && IsSparseFloatVectorType(fieldSchema.DataType)
}

// GetPartitionKeyFieldSchema get partition field schema from collection schema
func GetPartitionKeyFieldSchema(schema *schemapb.CollectionSchema) (*schemapb.FieldSchema, error) {
	for _, fieldSchema := range schema.GetFields() {
		if fieldSchema.IsPartitionKey {
			return fieldSchema, nil
		}
	}

	return nil, errors.New("partition key field is not found")
}

// GetDynamicField returns the dynamic field if it exists.
func GetDynamicField(schema *schemapb.CollectionSchema) *schemapb.FieldSchema {
	for _, fieldSchema := range schema.GetFields() {
		if fieldSchema.GetIsDynamic() {
			return fieldSchema
		}
	}
	return nil
}

// HasPartitionKey check if a collection schema has PartitionKey field
func HasPartitionKey(schema *schemapb.CollectionSchema) bool {
	for _, fieldSchema := range schema.Fields {
		if fieldSchema.IsPartitionKey {
			return true
		}
	}
	return false
}

func IsFieldDataTypeSupportMaterializedView(fieldSchema *schemapb.FieldSchema) bool {
	return IsIntegerType(fieldSchema.DataType) || IsStringType(fieldSchema.DataType)
}

// HasClusterKey check if a collection schema has ClusterKey field
func HasClusterKey(schema *schemapb.CollectionSchema) bool {
	for _, fieldSchema := range schema.Fields {
		if fieldSchema.IsClusteringKey {
			return true
		}
	}
	return false
}

// GetPrimaryFieldData get primary field data from all field data inserted from sdk
func GetPrimaryFieldData(datas []*schemapb.FieldData, primaryFieldSchema *schemapb.FieldSchema) (*schemapb.FieldData, error) {
	primaryFieldID := primaryFieldSchema.FieldID
	primaryFieldName := primaryFieldSchema.Name

	var primaryFieldData *schemapb.FieldData
	for _, field := range datas {
		if field.FieldId == primaryFieldID || field.FieldName == primaryFieldName {
			primaryFieldData = field
			break
		}
	}

	if primaryFieldData == nil {
		return nil, fmt.Errorf("can't find data for primary field: %v", primaryFieldName)
	}

	return primaryFieldData, nil
}

func GetField(schema *schemapb.CollectionSchema, fieldID int64) *schemapb.FieldSchema {
	return lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool {
		return field.GetFieldID() == fieldID
	})
}

func GetFieldByName(schema *schemapb.CollectionSchema, fieldName string) *schemapb.FieldSchema {
	return lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool {
		return field.GetName() == fieldName
	})
}

func IsPrimaryFieldDataExist(datas []*schemapb.FieldData, primaryFieldSchema *schemapb.FieldSchema) bool {
	primaryFieldID := primaryFieldSchema.FieldID
	primaryFieldName := primaryFieldSchema.Name

	var primaryFieldData *schemapb.FieldData
	for _, field := range datas {
		if field.FieldId == primaryFieldID || field.FieldName == primaryFieldName {
			primaryFieldData = field
			break
		}
	}

	return primaryFieldData != nil
}

func IsAutoPKField(field *schemapb.FieldSchema) bool {
	return field.GetIsPrimaryKey() && field.GetAutoID()
}

func AppendSystemFields(schema *schemapb.CollectionSchema) *schemapb.CollectionSchema {
	newSchema := proto.Clone(schema).(*schemapb.CollectionSchema)
	newSchema.Fields = append(newSchema.Fields, &schemapb.FieldSchema{
		FieldID:      int64(common.RowIDField),
		Name:         common.RowIDFieldName,
		IsPrimaryKey: false,
		DataType:     schemapb.DataType_Int64,
	})
	newSchema.Fields = append(newSchema.Fields, &schemapb.FieldSchema{
		FieldID:      int64(common.TimeStampField),
		Name:         common.TimeStampFieldName,
		IsPrimaryKey: false,
		DataType:     schemapb.DataType_Int64,
	})
	return newSchema
}

func GetId(src *schemapb.IDs, idx int) (int, any) {
	switch src.IdField.(type) {
	case *schemapb.IDs_IntId:
		return 8, src.GetIntId().Data[idx]
	case *schemapb.IDs_StrId:
		return len(src.GetStrId().Data[idx]), src.GetStrId().Data[idx]
	default:
		panic("unknown pk type")
	}
}

func AppendID(dst *schemapb.IDs, src any) {
	switch value := src.(type) {
	case int64:
		if dst.GetIdField() == nil {
			dst.IdField = &schemapb.IDs_IntId{
				IntId: &schemapb.LongArray{
					Data: []int64{value},
				},
			}
		} else {
			dst.GetIntId().Data = append(dst.GetIntId().Data, value)
		}
	case string:
		if dst.GetIdField() == nil {
			dst.IdField = &schemapb.IDs_StrId{
				StrId: &schemapb.StringArray{
					Data: []string{value},
				},
			}
		} else {
			dst.GetStrId().Data = append(dst.GetStrId().Data, value)
		}
	default:
		// TODO
	}
}

func AppendIDs(dst *schemapb.IDs, src *schemapb.IDs, idx int) {
	switch src.IdField.(type) {
	case *schemapb.IDs_IntId:
		if dst.GetIdField() == nil {
			dst.IdField = &schemapb.IDs_IntId{
				IntId: &schemapb.LongArray{
					Data: []int64{src.GetIntId().Data[idx]},
				},
			}
		} else {
			dst.GetIntId().Data = append(dst.GetIntId().Data, src.GetIntId().Data[idx])
		}
	case *schemapb.IDs_StrId:
		if dst.GetIdField() == nil {
			dst.IdField = &schemapb.IDs_StrId{
				StrId: &schemapb.StringArray{
					Data: []string{src.GetStrId().Data[idx]},
				},
			}
		} else {
			dst.GetStrId().Data = append(dst.GetStrId().Data, src.GetStrId().Data[idx])
		}
	default:
		// TODO
	}
}

func GetSizeOfIDs(data *schemapb.IDs) int {
	result := 0
	if data.GetIdField() == nil {
		return result
	}

	switch data.GetIdField().(type) {
	case *schemapb.IDs_IntId:
		result = len(data.GetIntId().GetData())
	case *schemapb.IDs_StrId:
		result = len(data.GetStrId().GetData())
	default:
		// TODO::
	}

	return result
}

func GetPKSize(fieldData *schemapb.FieldData) int {
	switch fieldData.GetType() {
	case schemapb.DataType_Int64:
		return len(fieldData.GetScalars().GetLongData().GetData())
	case schemapb.DataType_VarChar:
		return len(fieldData.GetScalars().GetStringData().GetData())
	}
	return 0
}

func IsPrimaryFieldType(dataType schemapb.DataType) bool {
	if dataType == schemapb.DataType_Int64 || dataType == schemapb.DataType_VarChar {
		return true
	}

	return false
}

func GetPK(data *schemapb.IDs, idx int64) interface{} {
	if int64(GetSizeOfIDs(data)) <= idx {
		return nil
	}
	switch data.GetIdField().(type) {
	case *schemapb.IDs_IntId:
		return data.GetIntId().GetData()[idx]
	case *schemapb.IDs_StrId:
		return data.GetStrId().GetData()[idx]
	}
	return nil
}

func GetData(field *schemapb.FieldData, idx int) interface{} {
	switch field.GetType() {
	case schemapb.DataType_Bool:
		return field.GetScalars().GetBoolData().GetData()[idx]
	case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
		return field.GetScalars().GetIntData().GetData()[idx]
	case schemapb.DataType_Int64:
		return field.GetScalars().GetLongData().GetData()[idx]
	case schemapb.DataType_Float:
		return field.GetScalars().GetFloatData().GetData()[idx]
	case schemapb.DataType_Double:
		return field.GetScalars().GetDoubleData().GetData()[idx]
	case schemapb.DataType_VarChar:
		return field.GetScalars().GetStringData().GetData()[idx]
	case schemapb.DataType_FloatVector:
		dim := int(field.GetVectors().GetDim())
		return field.GetVectors().GetFloatVector().GetData()[idx*dim : (idx+1)*dim]
	case schemapb.DataType_BinaryVector:
		dim := int(field.GetVectors().GetDim())
		dataBytes := dim / 8
		return field.GetVectors().GetBinaryVector()[idx*dataBytes : (idx+1)*dataBytes]
	case schemapb.DataType_Float16Vector:
		dim := int(field.GetVectors().GetDim())
		dataBytes := dim * 2
		return field.GetVectors().GetFloat16Vector()[idx*dataBytes : (idx+1)*dataBytes]
	case schemapb.DataType_BFloat16Vector:
		dim := int(field.GetVectors().GetDim())
		dataBytes := dim * 2
		return field.GetVectors().GetBfloat16Vector()[idx*dataBytes : (idx+1)*dataBytes]
	case schemapb.DataType_SparseFloatVector:
		return field.GetVectors().GetSparseFloatVector().Contents[idx]
	}
	return nil
}

func AppendPKs(pks *schemapb.IDs, pk interface{}) {
	switch realPK := pk.(type) {
	case int64:
		if pks.GetIntId() == nil {
			pks.IdField = &schemapb.IDs_IntId{
				IntId: &schemapb.LongArray{
					Data: make([]int64, 0),
				},
			}
		}
		pks.GetIntId().Data = append(pks.GetIntId().GetData(), realPK)
	case string:
		if pks.GetStrId() == nil {
			pks.IdField = &schemapb.IDs_StrId{
				StrId: &schemapb.StringArray{
					Data: make([]string, 0),
				},
			}
		}
		pks.GetStrId().Data = append(pks.GetStrId().GetData(), realPK)
	default:
		log.Warn("got unexpected data type of pk when append pks", zap.Any("pk", pk))
	}
}

// SwapPK swaps i-th PK with j-th PK
func SwapPK(data *schemapb.IDs, i, j int) {
	switch f := data.GetIdField().(type) {
	case *schemapb.IDs_IntId:
		f.IntId.Data[i], f.IntId.Data[j] = f.IntId.Data[j], f.IntId.Data[i]
	case *schemapb.IDs_StrId:
		f.StrId.Data[i], f.StrId.Data[j] = f.StrId.Data[j], f.StrId.Data[i]
	}
}

// ComparePKInSlice returns if i-th PK < j-th PK
func ComparePKInSlice(data *schemapb.IDs, i, j int) bool {
	switch f := data.GetIdField().(type) {
	case *schemapb.IDs_IntId:
		return f.IntId.Data[i] < f.IntId.Data[j]
	case *schemapb.IDs_StrId:
		return f.StrId.Data[i] < f.StrId.Data[j]
	}
	return false
}

// ComparePK returns if i-th PK of dataA > j-th PK of dataB
func ComparePK(pkA, pkB interface{}) bool {
	switch pkA.(type) {
	case int64:
		return pkA.(int64) < pkB.(int64)
	case string:
		return pkA.(string) < pkB.(string)
	}
	return false
}

type ResultWithID interface {
	GetIds() *schemapb.IDs
	GetHasMoreResult() bool
}

type ResultWithTimestamp interface {
	GetTimestamps() []int64
}

// SelectMinPK select the index of the minPK in results T of the cursors.
func SelectMinPK[T ResultWithID](results []T, cursors []int64) (int, bool) {
	var (
		sel               = -1
		drainResult       = false
		minIntPK    int64 = math.MaxInt64

		firstStr = true
		minStrPK string
	)
	for i, cursor := range cursors {
		// if cursor has run out of all results from one result and this result has more matched results
		// in this case we have tell reduce to stop because better results may be retrieved in the following iteration
		if int(cursor) >= GetSizeOfIDs(results[i].GetIds()) && (results[i].GetHasMoreResult()) {
			drainResult = true
			continue
		}

		pkInterface := GetPK(results[i].GetIds(), cursor)
		switch pk := pkInterface.(type) {
		case string:
			if firstStr || pk < minStrPK {
				firstStr = false
				minStrPK = pk
				sel = i
			}
		case int64:
			if pk < minIntPK {
				minIntPK = pk
				sel = i
			}
		default:
			continue
		}
	}

	return sel, drainResult
}

func SelectMinPKWithTimestamp[T interface {
	ResultWithID
	ResultWithTimestamp
}](results []T, cursors []int64) (int, bool) {
	var (
		sel                = -1
		drainResult        = false
		maxTimestamp int64 = 0
		minIntPK     int64 = math.MaxInt64

		firstStr = true
		minStrPK string
	)
	for i, cursor := range cursors {
		timestamps := results[i].GetTimestamps()
		// if cursor has run out of all results from one result and this result has more matched results
		// in this case we have tell reduce to stop because better results may be retrieved in the following iteration
		if int(cursor) >= GetSizeOfIDs(results[i].GetIds()) && (results[i].GetHasMoreResult()) {
			drainResult = true
			continue
		}

		pkInterface := GetPK(results[i].GetIds(), cursor)

		switch pk := pkInterface.(type) {
		case string:
			ts := timestamps[cursor]
			if firstStr || pk < minStrPK || (pk == minStrPK && ts > maxTimestamp) {
				firstStr = false
				minStrPK = pk
				sel = i
				maxTimestamp = ts
			}
		case int64:
			ts := timestamps[cursor]
			if pk < minIntPK || (pk == minIntPK && ts > maxTimestamp) {
				minIntPK = pk
				sel = i
				maxTimestamp = ts
			}
		default:
			continue
		}
	}

	return sel, drainResult
}

func AppendGroupByValue(dstResData *schemapb.SearchResultData,
	groupByVal interface{}, srcDataType schemapb.DataType,
) error {
	if dstResData.GroupByFieldValue == nil {
		dstResData.GroupByFieldValue = &schemapb.FieldData{
			Type: srcDataType,
			Field: &schemapb.FieldData_Scalars{
				Scalars: &schemapb.ScalarField{},
			},
		}
	}
	dstScalarField := dstResData.GroupByFieldValue.GetScalars()
	switch srcDataType {
	case schemapb.DataType_Bool:
		if dstScalarField.GetBoolData() == nil {
			dstScalarField.Data = &schemapb.ScalarField_BoolData{
				BoolData: &schemapb.BoolArray{
					Data: []bool{},
				},
			}
		}
		dstScalarField.GetBoolData().Data = append(dstScalarField.GetBoolData().Data, groupByVal.(bool))
	case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
		if dstScalarField.GetIntData() == nil {
			dstScalarField.Data = &schemapb.ScalarField_IntData{
				IntData: &schemapb.IntArray{
					Data: []int32{},
				},
			}
		}
		dstScalarField.GetIntData().Data = append(dstScalarField.GetIntData().Data, groupByVal.(int32))
	case schemapb.DataType_Int64:
		if dstScalarField.GetLongData() == nil {
			dstScalarField.Data = &schemapb.ScalarField_LongData{
				LongData: &schemapb.LongArray{
					Data: []int64{},
				},
			}
		}
		dstScalarField.GetLongData().Data = append(dstScalarField.GetLongData().Data, groupByVal.(int64))
	case schemapb.DataType_VarChar:
		if dstScalarField.GetStringData() == nil {
			dstScalarField.Data = &schemapb.ScalarField_StringData{
				StringData: &schemapb.StringArray{
					Data: []string{},
				},
			}
		}
		dstScalarField.GetStringData().Data = append(dstScalarField.GetStringData().Data, groupByVal.(string))
	default:
		log.Error("Not supported field type from group_by value field", zap.String("field type",
			srcDataType.String()))
		return fmt.Errorf("not supported field type from group_by value field: %s",
			srcDataType.String())
	}
	return nil
}

func appendSparseFloatArray(dst, src *schemapb.SparseFloatArray) {
	if len(src.Contents) == 0 {
		return
	}
	if dst.Dim < src.Dim {
		dst.Dim = src.Dim
	}
	dst.Contents = append(dst.Contents, src.Contents...)
}

// return the size of indices and values of the appended row
func appendSparseFloatArraySingleRow(dst, src *schemapb.SparseFloatArray, idx int64) int64 {
	row := src.Contents[idx]
	dst.Contents = append(dst.Contents, row)
	rowDim := SparseFloatRowDim(row)
	if rowDim == 0 {
		return 0
	}
	if dst.Dim < rowDim {
		dst.Dim = rowDim
	}
	return int64(len(row))
}

func trimSparseFloatArray(vec *schemapb.SparseFloatArray) {
	if len(vec.Contents) == 0 {
		return
	}
	// not decreasing dim of the entire SparseFloatArray, as we don't want to
	// iterate through the entire array to find the new max dim. Correctness
	// will not be affected.
	vec.Contents = vec.Contents[:len(vec.Contents)-1]
}

func ValidateSparseFloatRows(rows ...[]byte) error {
	for _, row := range rows {
		if row == nil {
			return errors.New("nil sparse float vector")
		}
		if len(row)%8 != 0 {
			return fmt.Errorf("invalid data length in sparse float vector: %d", len(row))
		}
		for i := 0; i < SparseFloatRowElementCount(row); i++ {
			idx := SparseFloatRowIndexAt(row, i)
			if idx == math.MaxUint32 {
				return errors.New("invalid index in sparse float vector: must be less than 2^32-1")
			}
			if i > 0 && idx <= SparseFloatRowIndexAt(row, i-1) {
				return errors.New("unsorted or same indices in sparse float vector")
			}
			val := SparseFloatRowValueAt(row, i)
			if err := VerifyFloat(float64(val)); err != nil {
				return err
			}
			if val < 0 {
				return errors.New("negative value in sparse float vector")
			}
		}
	}
	return nil
}

// SparseFloatRowUtils
func SparseFloatRowElementCount(row []byte) int {
	if row == nil {
		return 0
	}
	return len(row) / 8
}

// does not check for out-of-range access
func SparseFloatRowIndexAt(row []byte, idx int) uint32 {
	return common.Endian.Uint32(row[idx*8:])
}

// does not check for out-of-range access
func SparseFloatRowValueAt(row []byte, idx int) float32 {
	return math.Float32frombits(common.Endian.Uint32(row[idx*8+4:]))
}

func SparseFloatRowSetAt(row []byte, pos int, idx uint32, value float32) {
	binary.LittleEndian.PutUint32(row[pos*8:], idx)
	binary.LittleEndian.PutUint32(row[pos*8+4:], math.Float32bits(value))
}

func SortSparseFloatRow(indices []uint32, values []float32) ([]uint32, []float32) {
	elemCount := len(indices)

	indexOrder := make([]int, elemCount)
	for i := range indexOrder {
		indexOrder[i] = i
	}

	sort.Slice(indexOrder, func(i, j int) bool {
		return indices[indexOrder[i]] < indices[indexOrder[j]]
	})

	sortedIndices := make([]uint32, elemCount)
	sortedValues := make([]float32, elemCount)
	for i, index := range indexOrder {
		sortedIndices[i] = indices[index]
		sortedValues[i] = values[index]
	}

	return sortedIndices, sortedValues
}

func CreateAndSortSparseFloatRow(sparse map[uint32]float32) []byte {
	row := make([]byte, len(sparse)*8)
	data := lo.MapToSlice(sparse, func(indices uint32, value float32) Pair[uint32, float32] {
		return Pair[uint32, float32]{indices, value}
	})
	sort.Slice(data, func(i, j int) bool { return data[i].A < data[j].A })
	for i := 0; i < len(data); i++ {
		SparseFloatRowSetAt(row, i, data[i].A, data[i].B)
	}
	return row
}

func CreateSparseFloatRow(indices []uint32, values []float32) []byte {
	row := make([]byte, len(indices)*8)
	for i := 0; i < len(indices); i++ {
		SparseFloatRowSetAt(row, i, indices[i], values[i])
	}
	return row
}

// accepted format:
//   - {"indices": [1, 2, 3], "values": [0.1, 0.2, 0.3]}    # format1
//   - {"1": 0.1, "2": 0.2, "3": 0.3}                       # format2
//
// we don't require the indices to be sorted from user input, but the returned
// byte representation must have indices sorted
func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
	var indices []uint32
	var values []float32

	if len(input) == 0 {
		// for empty json input, return empty sparse row
		return CreateSparseFloatRow(indices, values), nil
	}

	getValue := func(key interface{}) (float32, error) {
		var val float64
		switch v := key.(type) {
		case int:
			val = float64(v)
		case float64:
			val = v
		case json.Number:
			if num, err := strconv.ParseFloat(v.String(), 64); err == nil {
				val = num
			} else {
				return 0, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(v))
			}
		default:
			return 0, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(key))
		}
		if VerifyFloat(val) != nil {
			return 0, fmt.Errorf("invalid value in JSON: %v", val)
		}
		if val > math.MaxFloat32 {
			return 0, fmt.Errorf("value too large in JSON: %v", val)
		}
		return float32(val), nil
	}

	getIndex := func(key interface{}) (uint32, error) {
		var idx int64
		switch v := key.(type) {
		case int:
			idx = int64(v)
		case float64:
			// check if the float64 is actually an integer
			if v != float64(int64(v)) {
				return 0, fmt.Errorf("invalid index in JSON: %v", v)
			}
			idx = int64(v)
		case json.Number:
			if num, err := strconv.ParseInt(v.String(), 0, 64); err == nil {
				idx = num
			} else {
				return 0, err
			}
		default:
			return 0, fmt.Errorf("invalid index type in JSON: %s", reflect.TypeOf(key))
		}
		if idx >= math.MaxUint32 {
			return 0, fmt.Errorf("index too large in JSON: %v", idx)
		}
		return uint32(idx), nil
	}

	jsonIndices, ok1 := input["indices"].([]interface{})
	jsonValues, ok2 := input["values"].([]interface{})

	if ok1 && ok2 {
		// try format1
		for _, idx := range jsonIndices {
			index, err := getIndex(idx)
			if err != nil {
				return nil, err
			}
			indices = append(indices, index)
		}
		for _, val := range jsonValues {
			value, err := getValue(val)
			if err != nil {
				return nil, err
			}
			values = append(values, value)
		}
	} else if !ok1 && !ok2 {
		// try format2
		for k, v := range input {
			idx, err := strconv.ParseUint(k, 0, 32)
			if err != nil {
				return nil, err
			}

			val, err := getValue(v)
			if err != nil {
				return nil, err
			}

			indices = append(indices, uint32(idx))
			values = append(values, val)
		}
	} else {
		return nil, errors.New("invalid JSON input")
	}

	if len(indices) != len(values) {
		return nil, errors.New("indices and values length mismatch")
	}

	sortedIndices, sortedValues := SortSparseFloatRow(indices, values)
	row := CreateSparseFloatRow(sortedIndices, sortedValues)
	if err := ValidateSparseFloatRows(row); err != nil {
		return nil, err
	}
	return row, nil
}

func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) {
	var vec map[string]interface{}
	decoder := json.NewDecoder(bytes.NewReader(input))
	decoder.DisallowUnknownFields()
	err := decoder.Decode(&vec)
	if err != nil {
		return nil, err
	}
	return CreateSparseFloatRowFromMap(vec)
}

// dim of a sparse float vector is the maximum/last index + 1.
// for an empty row, dim is 0.
func SparseFloatRowDim(row []byte) int64 {
	if len(row) == 0 {
		return 0
	}
	return int64(SparseFloatRowIndexAt(row, SparseFloatRowElementCount(row)-1)) + 1
}

// placeholderGroup is a serialized PlaceholderGroup, return estimated total
// number of non-zero elements of all the sparse vectors in the placeholderGroup
// This is a rough estimate, and should be used only for statistics.
func EstimateSparseVectorNNZFromPlaceholderGroup(placeholderGroup []byte, nq int) int {
	overheadBytes := math.Max(10, float64(nq*3))
	return (len(placeholderGroup) - int(overheadBytes)) / 8
}
